package com.crazymaker.springcloud.stock.service.impl;

import com.crazymaker.l2cache.cluster.level2.canal.DataLoader;
import com.crazymaker.springcloud.seckill.api.dto.SeckillDTO;
import com.crazymaker.springcloud.seckill.api.dto.SeckillSkuDTO;
import com.jd.platform.hotkey.client.callback.JdHotKeyStore;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;

import static com.crazymaker.springcloud.stock.constant.Constants.SECKILL_SKU;

//@Service
public class CannalDataLoaderImpl implements DataLoader {

    @Resource
    SeckillSkuStockServiceImpl seckillSkuStockService;


    @Override
    public String getRegion(String database, String table) {

        if (table.equalsIgnoreCase(SECKILL_SKU)) {
            return SECKILL_SKU;
        }

        return null;
    }

    @Override
    public String getKey(String database, String table, Map<String, String> row) {
        if (table.equalsIgnoreCase(SECKILL_SKU)) {
            return row.get("sku_id");
        }
        return null;
    }

    @Override
    public String[] getKeys(String database, String table, List<Map<String, String>> rows) {
        if (table.equalsIgnoreCase(SECKILL_SKU)) {
            return rows.stream().map(e -> e.get("sku_id")).toArray(String[]::new);
        }
        return null;
    }

    @Override
    public String[] getNoHotKeys(String database, String table, List<Map<String, String>> rows) {
        if (table.equalsIgnoreCase(SECKILL_SKU)) {
            return rows.stream().filter(SECKILL_SKU_NO_HOT_PREDICATE).map(e -> e.get("sku_id")).toArray(String[]::new);
        }
        return null;
    }

    @Override
    public String getCacheData(String database, String table, Map<String, String> e) {
        if (table.equalsIgnoreCase(SECKILL_SKU)) {

            Long skuId = Long.valueOf(e.get("sku_id"));
            SeckillSkuDTO dto = seckillSkuStockService.detail(skuId);

            return SeckillDTO.toJson(dto);
        }

        return null;
    }

    @Override
    public String getCacheData(String region, String id) {
        if (region.equalsIgnoreCase(SECKILL_SKU)) {

            Long skuId = Long.valueOf(id);
            SeckillSkuDTO dto = seckillSkuStockService.detail(skuId);

            return SeckillDTO.toJson(dto);
        }

        return null;
    }

    private static Predicate<Map<String, String>> SECKILL_SKU_NO_HOT_PREDICATE = new Predicate<Map<String, String>>() {

        @Override
        public boolean test(Map<String, String> map) {
            String sku_id = map.get("sku_id");
            String hotkey = SECKILL_SKU + ":" + sku_id;
            if (null == JdHotKeyStore.getValueSimple(hotkey)) {
                return true;
            } else {
                //hotkey
                map.put("hotted", "1");
                return false;
            }

        }

    };

}
